{
  "metadata" : {
    "id" : "cf9e2ab4-bb67-45fe-bee2-03e91e3f8495",
    "name" : "mapgroupswithstate-n-moving-average.snb.ipynb",
    "user_save_timestamp" : "2018-07-08T22:44:40.310Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null,
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "9B48F85A440847599ABAC8D20BB1BE7A"
    },
    "cell_type" : "code",
    "source" : [ "import java.sql.Timestamp\n", "case class Rate(timestamp: Timestamp, value: Long)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import java.sql.Timestamp\ndefined class Rate\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 1,
      "time" : "Took: 1.008s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "96F16FB4DD5E48C68E6AD1CFDCB40CAD"
    },
    "cell_type" : "code",
    "source" : [ "val rate = sparkSession.readStream.format(\"rate\").load().as[Rate]" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "rate: org.apache.spark.sql.Dataset[Rate] = [timestamp: timestamp, value: bigint]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 2,
      "time" : "Took: 2.023s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "E041DCED2C0C4E788163C5F3C6B78E9F"
    },
    "cell_type" : "code",
    "source" : [ "val uids = List(\"d1e46a42\", \"d8e16e2a\", \"d1b06f88\", \n", "                \"d2e710aa\", \"d2f731cc\", \"d4c162ee\", \n", "                \"d4a11632\", \"d7e277b2\", \"d59018de\", \n", "                \"d60779f6\" )" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "uids: List[String] = List(d1e46a42, d8e16e2a, d1b06f88, d2e710aa, d2f731cc, d4c162ee, d4a11632, d7e277b2, d59018de, d60779f6)\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 3,
      "time" : "Took: 0.890s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "5F981F37D55349978CBC51F3A2EB7A01"
    },
    "cell_type" : "code",
    "source" : [ "val locationGenerator: () => (Double, Double) = {\n", "  // Europe bounds \n", "  val longBounds = (-10.89,39.82)\n", "  val latBounds = (35.52,56.7)\n", "  def pointInRange(bounds:(Double, Double)): Double = {\n", "    val (a, b) = bounds\n", "    Math.abs(scala.util.Random.nextDouble())*b+a\n", "  }\n", "  () => (pointInRange(longBounds), pointInRange(latBounds))\n", "}    " ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "locationGenerator: () => (Double, Double) = <function0>\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 4,
      "time" : "Took: 0.744s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "58404D4D92864DFD86BA95204F86FF42"
    },
    "cell_type" : "code",
    "source" : [ "def pickOne[T](list: List[T]): T = list(scala.util.Random.nextInt(list.size))" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "pickOne: [T](list: List[T])T\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 5,
      "time" : "Took: 0.680s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "02A8945CE374431F8F5269DCE7A8A4D3"
    },
    "cell_type" : "code",
    "source" : [ "val pressureGen: () => Double = () => scala.util.Random.nextDouble + 101.0\n", "val tempGen: () => Double = () => scala.util.Random.nextDouble * 60 - 20" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "pressureGen: () => Double = <function0>\ntempGen: () => Double = <function0>\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 6,
      "time" : "Took: 0.624s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "37D0E9DA64FC43D3A526B7386E09DF2D"
    },
    "cell_type" : "code",
    "source" : [ "import java.sql.Timestamp\n", "case class WeatherEvent(stationId: String, timestamp: Timestamp, location:(Double,Double), pressure: Double, temp: Double)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import java.sql.Timestamp\ndefined class WeatherEvent\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 7,
      "time" : "Took: 0.639s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "5D9C60387781455DBC4AF900923E935B"
    },
    "cell_type" : "code",
    "source" : [ "val weatherEvents = rate.map{case Rate(ts, value) => WeatherEvent(pickOne(uids), ts, locationGenerator(), pressureGen(), tempGen())}" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "weatherEvents: org.apache.spark.sql.Dataset[WeatherEvent] = [stationId: string, timestamp: timestamp ... 3 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 8,
      "time" : "Took: 0.835s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "6E647F3B92404A5686145CE08F06AB07"
    },
    "cell_type" : "code",
    "source" : [ "import scala.collection.immutable.Queue\n", "case class FIFOBuffer[T](capacity: Int, data: Queue[T] = Queue.empty) extends Serializable {\n", "  def add(element: T): FIFOBuffer[T] = this.copy(data = data.enqueue(element).take(capacity))\n", "  def get: List[T] = data.toList\n", "  def size: Int = data.size\n", "}" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import scala.collection.immutable.Queue\ndefined class FIFOBuffer\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 9,
      "time" : "Took: 0.548s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "65092AC8EE20426D94919449A91B14A2"
    },
    "cell_type" : "code",
    "source" : [ "import java.sql.Timestamp\n", "case class WeatherEventAverage(stationId: String, \n", "                               startTime: Timestamp, \n", "                               endTime:Timestamp, \n", "                               pressureAvg: Double, \n", "                               tempAvg: Double)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import java.sql.Timestamp\ndefined class WeatherEventAverage\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 10,
      "time" : "Took: 0.501s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "DBAF0B87708D4CB5AA547B33B04F8863"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.sql.streaming.GroupState\n", "def mappingFunction(key: String, values: Iterator[WeatherEvent], state: GroupState[FIFOBuffer[WeatherEvent]]): WeatherEventAverage = {\n", "  val ElementCountWindowSize = 10\n", "  // get current state or create a new one if there's no previous state\n", "  val currentState = state.getOption.getOrElse(new FIFOBuffer[WeatherEvent](ElementCountWindowSize))\n", "  // enrich the state with the new events\n", "  val updatedState = values.foldLeft(currentState){case (st, ev) => st.add(ev)}\n", "  // update the state with the enriched state\n", "  state.update(updatedState)\n", "  // if we have enough data, create a WeatherEventAverage from the accumulated state\n", "  // otherwise, make a zeroed record\n", "  val data = updatedState.get\n", "  if (data.size > 2) {\n", "    val start = data.head\n", "    val end = data.last\n", "    val pressureAvg = data.map(event => event.pressure).sum/data.size\n", "    val tempAvg = data.map(event => event.temp).sum/data.size\n", "    WeatherEventAverage(key, start.timestamp, end.timestamp, pressureAvg, tempAvg)\n", "  } else {\n", "    WeatherEventAverage(key, new Timestamp(0), new Timestamp(0), 0.0, 0.0)\n", "  }\n", "}                                              " ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import org.apache.spark.sql.streaming.GroupState\nmappingFunction: (key: String, values: Iterator[WeatherEvent], state: org.apache.spark.sql.streaming.GroupState[FIFOBuffer[WeatherEvent]])WeatherEventAverage\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 11,
      "time" : "Took: 0.716s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "7FF731E763BF4DBA8761BFB03CDC23B4"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.sql.streaming.GroupStateTimeout\n", "val weatherEventsMovingAverage = weatherEvents.groupByKey(record => record.stationId)\n", ".mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import org.apache.spark.sql.streaming.GroupStateTimeout\nweatherEventsMovingAverage: org.apache.spark.sql.Dataset[WeatherEventAverage] = [stationId: string, startTime: timestamp ... 3 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 12,
      "time" : "Took: 0.815s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "BAD771D079B6431286C956296D43BC66"
    },
    "cell_type" : "code",
    "source" : [ "val outQuery = weatherEventsMovingAverage.writeStream\n", "  .format(\"memory\")\n", "  .queryName(\"weatherAverage\")\n", "  .outputMode(\"update\")\n", "  .start()" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "outQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@533df10c\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 13,
      "time" : "Took: 1.217s, at 2018-07-08 19:20"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "output_stream_collapsed" : true,
      "collapsed" : false,
      "id" : "7778099F1C3947A2873026A40087F405"
    },
    "cell_type" : "code",
    "source" : [ "outQuery.stop()" ],
    "outputs" : [ {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 20,
      "time" : "Took: 1.340s, at 2018-07-08 18:49"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "13482567EBD3442EA7BF629FADA559F3"
    },
    "cell_type" : "code",
    "source" : [ "val table  = sparkSession.sql(\"select * from weatherAverage where pressureAvg == 0.0\")" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "table: org.apache.spark.sql.DataFrame = [stationId: string, startTime: timestamp ... 3 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 20,
      "time" : "Took: 0.514s, at 2018-07-08 22:01"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "1FBDD7607D83498C89FED6E46F7B9124"
    },
    "cell_type" : "code",
    "source" : [ "table.show(truncate= false)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "+---------+-------------------+-------------------+-----------+-------+\n|stationId|startTime          |endTime            |pressureAvg|tempAvg|\n+---------+-------------------+-------------------+-----------+-------+\n|d2e710aa |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d1e46a42 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d4a11632 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d4c162ee |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d1e46a42 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d60779f6 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d2f731cc |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d1b06f88 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d8e16e2a |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d8e16e2a |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d4c162ee |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d1b06f88 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d60779f6 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d7e277b2 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d2f731cc |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d2e710aa |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d4a11632 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d7e277b2 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d59018de |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n|d59018de |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |\n+---------+-------------------+-------------------+-----------+-------+\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 21,
      "time" : "Took: 0.594s, at 2018-07-08 22:01"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "8CA3C13B054C4AFBAB9BC533EB2B0349"
    },
    "cell_type" : "code",
    "source" : [ "outQuery.stop" ],
    "outputs" : [ {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 17,
      "time" : "Took: 0.970s, at 2018-07-08 19:21"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "8C78F48DDF6A44DDB6ADE3A9CE97EA98"
    },
    "cell_type" : "code",
    "source" : [ "val m5 = 1000*60*5\n", "val next5MinSlot = ((System.currentTimeMillis + m5)/m5)*m5\n", "val m5Date = new java.sql.Timestamp(next5MinSlot)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "m5: Int = 300000\nnext5MinSlot: Long = 1530482100000\nm5Date: java.sql.Timestamp = 2018-07-01 23:55:00.0\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 21,
      "time" : "Took: 0.759s, at 2018-07-01 23:50"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "6CDCA20587C5411C85DDD3DA3D16DE5F"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.sql.functions._\n", "import org.apache.spark.sql.types._\n", "\n", "\n", "val perMinuteAvg = weatherDF\n", "    .withWatermark(\"timestamp\",\"0 minutes\")\n", "    .groupBy(window($\"timestamp\",\"15 minute\", \"15 minute\", \"0 minute\"))\n", "    //.agg(avg($\"pressure\") as \"pressureAvg\", avg($\"temp\") as \"tempAvg\")\n", "    .agg(count($\"stationId\"))" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import org.apache.spark.sql.functions._\nimport org.apache.spark.sql.types._\nperMinuteAvg: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, count(stationId): bigint]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 33,
      "time" : "Took: 0.469s, at 2018-06-24 23:50"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "AC411FD8B06D468A8C687BB7A11965B9"
    },
    "cell_type" : "code",
    "source" : [ "perMinuteAvg.printSchema" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "root\n |-- window: struct (nullable = true)\n |    |-- start: timestamp (nullable = true)\n |    |-- end: timestamp (nullable = true)\n |-- count(stationId): long (nullable = false)\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 34,
      "time" : "Took: 0.597s, at 2018-06-24 23:50"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "7C03488C5102494F84150E655B390BE6"
    },
    "cell_type" : "code",
    "source" : [ "val query = perMinuteAvg.writeStream.format(\"memory\").outputMode(\"append\").queryName(\"tenMinAvg\").start()" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@618e8f71\n"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "84CC507304434D2FA48A217B9C661D40"
    },
    "cell_type" : "code",
    "source" : [ "val df = sparkSession.sql(\"select * from tenMinAvg\")" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "df: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, count(stationId): bigint]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 36,
      "time" : "Took: 0.839s, at 2018-06-24 23:50"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "D15D7263AD80430A8FC64CF6B2C80408"
    },
    "cell_type" : "code",
    "source" : [ "df.show(truncate=false)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "+---------------------------------------------+----------------+\n|window                                       |count(stationId)|\n+---------------------------------------------+----------------+\n|[2018-06-24 23:45:00.0,2018-06-25 00:00:00.0]|587             |\n+---------------------------------------------+----------------+\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 38,
      "time" : "Took: 1.128s, at 2018-06-25 00:01"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "6F3C9D4BCE324BA0AECBFB3B417684AF"
    },
    "cell_type" : "code",
    "source" : [ "query.stop\n" ],
    "outputs" : [ {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 32,
      "time" : "Took: 0.804s, at 2018-06-24 23:49"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F53F90F7DA02425D8014674508D7DCB6"
    },
    "cell_type" : "code",
    "source" : [ "df.show" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "+--------------------+---------+------------------+-------------------+\n|              window|stationId|       pressureAvg|            tempAvg|\n+--------------------+---------+------------------+-------------------+\n|[1970-01-18 00:00...| d7e771cc|101.01088217991148|-5.8995974443145744|\n|[1970-01-18 00:00...| d7e76a42|101.84763550125572| 16.365236842957728|\n|[1970-01-18 00:00...| d7e76f88| 101.9718887595217| 34.596594920941314|\n|[1970-01-18 00:00...| d7e77672|101.15962690464693| 18.331916524151467|\n|[1970-01-18 00:00...| d7e77672|101.33512527465564| 14.472673136283124|\n|[1970-01-18 00:00...| d7e770aa|101.40406589851482| 12.631897019107264|\n|[1970-01-18 00:00...| d7e778de|101.65262104046391| 30.251680016296675|\n|[1970-01-18 00:00...| d7e772ee|101.49646793425526| 10.075008485762066|\n|[1970-01-18 00:00...| d7e76a42|101.58496517622028| 1.5693482754141428|\n|[1970-01-18 00:00...| d7e779f6|  101.603830921723| 20.764354974202504|\n|[1970-01-18 00:00...| d7e76e2a|101.47581528098183| 28.273156326472247|\n|[1970-01-18 00:00...| d7e771cc|101.22096012488818| -2.723741134194815|\n|[1970-01-18 00:00...| d7e777b2|101.20686544720449| 16.600885095664438|\n|[1970-01-18 00:00...| d7e77672|101.43296921555992|  8.578856993449158|\n|[1970-01-18 00:00...| d7e771cc|101.30579140600571| -6.249646394577954|\n|[1970-01-18 00:00...| d7e77672|101.45101113969656| 2.9677124712538143|\n|[1970-01-18 00:00...| d7e770aa|101.48617036734416|  8.896303222320139|\n|[1970-01-18 00:00...| d7e779f6| 101.3411174056723| 25.249754767701123|\n|[1970-01-18 00:00...| d7e76e2a|101.44798061812338|  32.45599638820388|\n|[1970-01-18 00:00...| d7e772ee|101.52360800080652|  1.858552829842818|\n+--------------------+---------+------------------+-------------------+\nonly showing top 20 rows\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 56,
      "time" : "Took: 1.298s, at 2018-06-16 23:57"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "960EBAE607A04FF7875AE4ECB31ED273"
    },
    "cell_type" : "code",
    "source" : [ "df.printSchema" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "root\n |-- window: struct (nullable = true)\n |    |-- start: timestamp (nullable = true)\n |    |-- end: timestamp (nullable = true)\n |-- pressureAvg: double (nullable = true)\n |-- tempAvg: double (nullable = true)\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 31,
      "time" : "Took: 0.689s, at 2018-06-17 23:42"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "D3941F2F136C44F2ACBB3A4DB7397EA0"
    },
    "cell_type" : "code",
    "source" : [ "val q2 = weatherEvents.writeStream.format(\"memory\").queryName(\"raw2\").outputMode(\"update\").start()" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "q2: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7f025feb\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 64,
      "time" : "Took: 1.025s, at 2018-06-17 00:03"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "7DF938EE81FF4152B48EC96C8B371ECF"
    },
    "cell_type" : "code",
    "source" : [ "sparkSession.sql(\"select * from raw2\")" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "res84: org.apache.spark.sql.DataFrame = [stationId: string, ts: date ... 3 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : "<div class=\"df-canvas\">\n      <script data-this=\"{&quot;dataId&quot;:&quot;anon6839a3e06c6f9facea3260dfbab86f94&quot;,&quot;partitionIndexId&quot;:&quot;anon2cfa224d8459e631a367fe7f54d897b1&quot;,&quot;numPartitions&quot;:1,&quot;dfSchema&quot;:{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;stationId&quot;,&quot;type&quot;:&quot;string&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}},{&quot;name&quot;:&quot;ts&quot;,&quot;type&quot;:&quot;date&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}},{&quot;name&quot;:&quot;location&quot;,&quot;type&quot;:{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;_1&quot;,&quot;type&quot;:&quot;double&quot;,&quot;nullable&quot;:false,&quot;metadata&quot;:{}},{&quot;name&quot;:&quot;_2&quot;,&quot;type&quot;:&quot;double&quot;,&quot;nullable&quot;:false,&quot;metadata&quot;:{}}]},&quot;nullable&quot;:true,&quot;metadata&quot;:{}},{&quot;name&quot;:&quot;pressure&quot;,&quot;type&quot;:&quot;double&quot;,&quot;nullable&quot;:false,&quot;metadata&quot;:{}},{&quot;name&quot;:&quot;temp&quot;,&quot;type&quot;:&quot;double&quot;,&quot;nullable&quot;:false,&quot;metadata&quot;:{}}]}}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/dataframe','../javascripts/notebook/consoleDir'], \n      function(dataframe, extension) {\n        dataframe.call(data, this, extension);\n      }\n    );/*]]>*/</script>\n      <link rel=\"stylesheet\" href=\"/assets/stylesheets/ipython/css/dataframe.css\" type=\"text/css\"/>\n    </div>"
      },
      "output_type" : "execute_result",
      "execution_count" : 65,
      "time" : "Took: 2.481s, at 2018-06-17 00:03"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "B022E2782EDE479FAA462433E81A89ED"
    },
    "cell_type" : "code",
    "source" : [ "rate.writeStream.format(\"memory\").queryName(\"rate\").start" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "res77: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7e4eeefb\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : "org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7e4eeefb"
      },
      "output_type" : "execute_result",
      "execution_count" : 60,
      "time" : "Took: 1.155s, at 2018-06-17 00:00"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "47AEABA8432946B9B06A5050BE2966E4"
    },
    "cell_type" : "code",
    "source" : [ "sparkSession.sql(\"select * from rate\")" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "res79: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : "<div class=\"df-canvas\">\n      <script data-this=\"{&quot;dataId&quot;:&quot;anon3181226a7e6d1be58bc5c28903137120&quot;,&quot;partitionIndexId&quot;:&quot;anona82441a2e3aa6edb618da85288a338bf&quot;,&quot;numPartitions&quot;:1,&quot;dfSchema&quot;:{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;timestamp&quot;,&quot;type&quot;:&quot;timestamp&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}},{&quot;name&quot;:&quot;value&quot;,&quot;type&quot;:&quot;long&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}}]}}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/req(['../javascripts/notebook/dataframe','../javascripts/notebook/consoleDir'], \n      function(dataframe, extension) {\n        dataframe.call(data, this, extension);\n      }\n    );/*]]>*/</script>\n      <link rel=\"stylesheet\" href=\"/assets/stylesheets/ipython/css/dataframe.css\" type=\"text/css\"/>\n    </div>"
      },
      "output_type" : "execute_result",
      "execution_count" : 61,
      "time" : "Took: 3.734s, at 2018-06-17 00:00"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "7F135F18F37A4953B65C46FF7B4D6F3E"
    },
    "cell_type" : "code",
    "source" : [ "val ts = System.currentTimeMillis" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "ts: Long = 1529240368112\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 19,
      "time" : "Took: 0.476s, at 2018-06-17 14:59"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "859CAD5077E14B6E90930E4033DD2454"
    },
    "cell_type" : "code",
    "source" : [ "val sqlTs = new java.sql.Timestamp(ts)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "sqlTs: java.sql.Timestamp = 2018-06-17 14:59:28.112\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 20,
      "time" : "Took: 0.477s, at 2018-06-17 14:59"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A666A65FF31943B98AA36E6B5579F59C"
    },
    "cell_type" : "code",
    "source" : [ "val rates = (1 to 10).map(i => Rate(ts, i))" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "rates: scala.collection.immutable.IndexedSeq[Rate] = Vector(Rate(1529247155834,1), Rate(1529247155834,2), Rate(1529247155834,3), Rate(1529247155834,4), Rate(1529247155834,5), Rate(1529247155834,6), Rate(1529247155834,7), Rate(1529247155834,8), Rate(1529247155834,9), Rate(1529247155834,10))\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 36,
      "time" : "Took: 0.566s, at 2018-06-17 17:14"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "ED78828B50C042B9A4016342E0FA7B90"
    },
    "cell_type" : "code",
    "source" : [ "val spark = sparkSession\n", "import spark.implicits._\n", "val ratesDS= rates.toDS\n", "val tsDS = rates.map{case Rate(ts, value) => (ts, new java.sql.Timestamp(ts))}.toDF(\"ts\", \"sqlts\")" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1a0895e\nimport spark.implicits._\nratesDS: org.apache.spark.sql.Dataset[Rate] = [timestamp: bigint, value: bigint]\ntsDS: org.apache.spark.sql.DataFrame = [ts: bigint, sqlts: timestamp]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 41,
      "time" : "Took: 1.237s, at 2018-06-17 17:17"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "CBFE92825C0F44008D996DDCAC743C39"
    },
    "cell_type" : "code",
    "source" : [ "val query = tsDS.select($\"ts\", $\"sqlts\", $\"sqlts\".cast(\"Long\"))" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "query: org.apache.spark.sql.DataFrame = [ts: bigint, sqlts: timestamp ... 1 more field]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 42,
      "time" : "Took: 0.625s, at 2018-06-17 17:17"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A9E54385DAFE4956861E7D421DE3B444"
    },
    "cell_type" : "code",
    "source" : [ "query.show" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "+-------------+--------------------+----------+\n|           ts|               sqlts|     sqlts|\n+-------------+--------------------+----------+\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n|1529247155834|2018-06-17 16:52:...|1529247155|\n+-------------+--------------------+----------+\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 43,
      "time" : "Took: 0.855s, at 2018-06-17 17:17"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "2E81FDAE71174287860E457B81A1681E"
    },
    "cell_type" : "code",
    "source" : [ "" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}